00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #ifndef _serial_io_h_
00019 #define _serial_io_h_
00020
00021 #include <boost/smart_ptr/shared_ptr.hpp>
00022 #include <ga.h>
00023 #include "gridpack/parallel/distributed.hpp"
00024 #include "gridpack/network/base_network.hpp"
00025 #include "gridpack/component/base_component.hpp"
00026 #include "gridpack/utilities/exception.hpp"
00027 #ifdef USE_GOSS
00028 #include "gridpack/serial_io/goss_utils.hpp"
00029 #endif
00030
00031 namespace gridpack {
00032 namespace serial_io {
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042 template <class _network>
00043 class SerialBusIO {
00044 public:
00045
00046
00047
00048
00049
00050
00051 SerialBusIO(int max_str_len,
00052 boost::shared_ptr<_network> network)
00053 {
00054 p_GAgrp = network->communicator().getGroup();
00055 p_GA_type = NGA_Register_type(max_str_len);
00056 p_network = network;
00057 p_size = max_str_len;
00058
00059
00060
00061 int nbus = p_network->totalBuses();
00062 int one = 1;
00063 p_stringGA = GA_Create_handle();
00064 GA_Set_data(p_stringGA,one,&nbus,p_GA_type);
00065 GA_Set_pgroup(p_stringGA, p_GAgrp);
00066 GA_Allocate(p_stringGA);
00067 p_maskGA = GA_Create_handle();
00068 GA_Set_data(p_maskGA,one,&nbus,C_INT);
00069 GA_Set_pgroup(p_maskGA, p_GAgrp);
00070 GA_Allocate(p_maskGA);
00071 #ifdef USE_GOSS
00072 p_goss = NULL;
00073 p_channel = false;
00074 #endif
00075 }
00076
00077
00078
00079
00080 ~SerialBusIO(void)
00081 {
00082 NGA_Deregister_type(p_GA_type);
00083 GA_Destroy(p_stringGA);
00084 GA_Destroy(p_maskGA);
00085 this->close();
00086 }
00087
00088
00089
00090
00091
00092 void open(const char *filename)
00093 {
00094 if (GA_Pgroup_nodeid(p_GAgrp) == 0) {
00095 this->close();
00096 p_fout.reset(new std::ofstream);
00097 p_fout->open(filename);
00098 }
00099 }
00100
00101
00102
00103
00104
00105 boost::shared_ptr<std::ofstream> getStream()
00106 {
00107 return p_fout;
00108 }
00109
00110
00111
00112
00113
00114 void setStream(boost::shared_ptr<std::ofstream> stream)
00115 {
00116 p_fout = stream;
00117 }
00118
00119
00120
00121
00122 void close()
00123 {
00124 if (GA_Pgroup_nodeid(p_GAgrp) == 0) {
00125 if (p_fout) {
00126 if (p_fout->is_open()) p_fout->close();
00127 }
00128 }
00129 p_fout.reset();
00130 }
00131
00132
00133
00134
00135
00136
00137 void write(const char *signal = NULL)
00138 {
00139 if (p_fout) {
00140 write(*p_fout, signal);
00141 } else {
00142 write(std::cout, signal);
00143 }
00144 }
00145
00146 #ifdef USE_GOSS
00147
00148
00149
00150
00151
00152
00153
00154
00155
00156 void openChannel(const char *topic)
00157 {
00158 if (!p_goss) p_goss = gridpack::goss::GOSSUtils::instance();
00159 if (!p_channel && GA_Pgroup_nodeid(p_GAgrp)==0) {
00160 gridpack::parallel::Communicator comm = p_network->communicator();
00161 p_goss->openGOSSChannel(comm,topic);
00162 p_channel = true;
00163 } else {
00164 if (GA_Pgroup_nodeid(p_GAgrp) == 0) {
00165 printf("ERROR: Channel already opened\n");
00166 }
00167 }
00168 }
00169
00170
00171
00172
00173 void closeChannel()
00174 {
00175 if (GA_Pgroup_nodeid(p_GAgrp) == 0) {
00176 gridpack::parallel::Communicator comm = p_network->communicator();
00177 p_goss->closeGOSSChannel(comm);
00178 p_channel = false;
00179 }
00180 }
00181 #endif
00182
00183
00184
00185
00186
00187
00188
00189 void header(const char *str)
00190 {
00191 if (p_fout) {
00192 header(*p_fout, str);
00193 } else {
00194 header(std::cout, str);
00195 }
00196 }
00197
00198
00199
00200
00201
00202
00203
00204
00205
00206 template <class _data_type> void gatherData(std::vector<_data_type>
00207 &data_vector, const char* signal = NULL)
00208 {
00209 int nBus = p_network->numBuses();
00210 if (sizeof(_data_type) > p_size) {
00211 char buf[256];
00212 sprintf(buf,"SerialBusIO::gatherData: data_type size inconsistent"
00213 " with allocated size: data: %ld allocated: %d\n",
00214 sizeof(_data_type),p_size);
00215 printf("%s",buf);
00216 throw gridpack::Exception(buf);
00217 }
00218 _data_type data;
00219 int dsize = sizeof(_data_type);
00220 int nwrites = 0;
00221 int i;
00222 int one = 1;
00223 GA_Zero(p_maskGA);
00224
00225
00226 for (i=0; i<nBus; i++) {
00227 if (p_network->getActiveBus(i) &&
00228 p_network->getBus(i)->getDataItem(&data,signal)) {
00229 nwrites++;
00230 }
00231 }
00232
00233
00234 int *iptr;
00235 char *ptr;
00236 GA_Zero(p_maskGA);
00237 if (nwrites > 0) {
00238 std::vector<int*> index(nwrites);
00239 std::vector<int> indexbuf(nwrites);
00240 iptr = &indexbuf[0];
00241 std::vector<int> ones(nwrites);
00242 char *strbuf;
00243 if (nwrites*p_size > 0) strbuf = new char[nwrites*p_size];
00244 ptr = strbuf;
00245 int ncnt = 0;
00246 for (i=0; i<nBus; i++) {
00247 if (ncnt >= nwrites) break;
00248 if (p_network->getActiveBus(i) &&
00249 p_network->getBus(i)->getDataItem(ptr,signal)) {
00250 index[ncnt] = iptr;
00251 *(index[ncnt]) = p_network->getGlobalBusIndex(i);
00252 ones[ncnt] = 1;
00253 ncnt++;
00254 ptr += p_size;
00255 iptr ++;
00256 }
00257 }
00258
00259
00260 if (ncnt > 0) {
00261 NGA_Scatter(p_stringGA,strbuf,&index[0],nwrites);
00262 NGA_Scatter(p_maskGA,&ones[0],&index[0],nwrites);
00263 }
00264 if (nwrites*p_size > 0) delete [] strbuf;
00265 }
00266 GA_Pgroup_sync(p_GAgrp);
00267
00268
00269
00270 data_vector.clear();
00271 if (GA_Pgroup_nodeid(p_GAgrp) == 0) {
00272 int nprocs = GA_Pgroup_nnodes(p_GAgrp);
00273 int lo, hi;
00274 for (i=0; i<nprocs; i++) {
00275 NGA_Distribution(p_maskGA, i, &lo, &hi);
00276 int ld = hi - lo + 1;
00277
00278 std::vector<int> imask(ld);
00279 NGA_Get(p_maskGA,&lo,&hi,&imask[0],&one);
00280 int j;
00281 nwrites = 0;
00282 for (j=0; j<ld; j++) {
00283 if (imask[j] == 1) {
00284 nwrites++;
00285 }
00286 }
00287
00288 if (nwrites > 0) {
00289 char *iobuf;
00290 if (p_size*nwrites > 0) iobuf = new char[p_size*nwrites];
00291 std::vector<int*> index(nwrites);
00292 std::vector<int> indexbuf(nwrites);
00293 iptr = &indexbuf[0];
00294 nwrites = 0;
00295 for (j=0; j<ld; j++) {
00296 if (imask[j] == 1) {
00297 index[nwrites] = iptr;
00298 *(index[nwrites]) = j + lo;
00299 nwrites++;
00300 iptr++;
00301 }
00302 }
00303 NGA_Gather(p_stringGA,iobuf,&index[0],nwrites);
00304 ptr = iobuf;
00305 nwrites = 0;
00306 for (j=0; j<ld; j++) {
00307 if (imask[j] == 1) {
00308 memcpy(&data,ptr,dsize);
00309 data_vector.push_back(data);
00310 ptr += p_size;
00311 nwrites++;
00312 }
00313 }
00314 if (p_size*nwrites > 0) delete [] iobuf;
00315 }
00316 }
00317 }
00318 GA_Pgroup_sync(p_GAgrp);
00319 }
00320
00321 #ifdef USE_GOSS
00322
00323
00324
00325
00326 void dumpChannel()
00327 {
00328 if (GA_Pgroup_nodeid(p_GAgrp) == 0) {
00329 printf("Sending message of length %d\n",p_channel_buf.length());
00330 p_goss->sendGOSSMessage(p_channel_buf);
00331 }
00332 }
00333 #endif
00334
00335
00336
00337
00338
00339
00340
00341 std::vector<std::string> writeStrings(const char *signal = NULL)
00342 {
00343 int nBus = p_network->numBuses();
00344 char *string;
00345 int nwrites = 0;
00346 int i;
00347 int one = 1;
00348 std::vector<std::string> ret;
00349 string = (char*)malloc(p_size*sizeof(char));
00350
00351
00352 for (i=0; i<nBus; i++) {
00353 if (p_network->getActiveBus(i) &&
00354 p_network->getBus(i)->serialWrite(string,p_size,signal)) {
00355 nwrites++;
00356 }
00357 }
00358 free(string);
00359
00360
00361 int *iptr;
00362 char *ptr;
00363 GA_Zero(p_maskGA);
00364 if (nwrites > 0) {
00365 std::vector<int*> index(nwrites);
00366 std::vector<int> indexbuf(nwrites);
00367 iptr = &indexbuf[0];
00368 std::vector<int> ones(nwrites);
00369 char *strbuf = NULL;
00370 if (nwrites*p_size > 0) strbuf = new char[nwrites*p_size];
00371 ptr = strbuf;
00372 int ncnt = 0;
00373 for (i=0; i<nBus; i++) {
00374 if (ncnt >= nwrites) break;
00375 if (p_network->getActiveBus(i) &&
00376 p_network->getBus(i)->serialWrite(ptr,p_size,signal)) {
00377 index[ncnt] = iptr;
00378 *(index[ncnt]) = p_network->getGlobalBusIndex(i);
00379 ones[ncnt] = 1;
00380 ncnt++;
00381 ptr += p_size;
00382 iptr++;
00383 }
00384 }
00385
00386
00387 if (ncnt > 0) {
00388 NGA_Scatter(p_stringGA,strbuf,&index[0],nwrites);
00389 NGA_Scatter(p_maskGA,&ones[0],&index[0],nwrites);
00390 }
00391 if (nwrites*p_size > 0) delete [] strbuf;
00392 }
00393 GA_Pgroup_sync(p_GAgrp);
00394
00395
00396
00397 if (GA_Pgroup_nodeid(p_GAgrp) == 0) {
00398 int nprocs = GA_Pgroup_nnodes(p_GAgrp);
00399 int lo, hi;
00400 for (i=0; i<nprocs; i++) {
00401 NGA_Distribution(p_maskGA, i, &lo, &hi);
00402 int ld = hi - lo + 1;
00403
00404 std::vector<int> imask(ld);
00405 NGA_Get(p_maskGA,&lo,&hi,&imask[0],&one);
00406 int j;
00407 nwrites = 0;
00408 for (j=0; j<ld; j++) {
00409 if (imask[j] == 1) {
00410 nwrites++;
00411 }
00412 }
00413
00414 if (nwrites > 0) {
00415 char *iobuf;
00416 if (p_size*nwrites > 0) iobuf = new char[p_size*nwrites];
00417 std::vector<int*> index(nwrites);
00418 std::vector<int> indexbuf(nwrites);
00419 iptr = &indexbuf[0];
00420 nwrites = 0;
00421 for (j=0; j<ld; j++) {
00422 if (imask[j] == 1) {
00423 index[nwrites] = iptr;
00424 *(index[nwrites]) = j + lo;
00425 nwrites++;
00426 iptr++;
00427 }
00428 }
00429 NGA_Gather(p_stringGA,iobuf,&index[0],nwrites);
00430 ptr = iobuf;
00431 nwrites = 0;
00432 for (j=0; j<ld; j++) {
00433 if (imask[j] == 1) {
00434 std::string tmp = ptr;
00435 ret.push_back(tmp);
00436 ptr += p_size;
00437 nwrites++;
00438 }
00439 }
00440 if (p_size*nwrites > 0) delete [] iobuf;
00441 }
00442 }
00443 }
00444 GA_Pgroup_sync(p_GAgrp);
00445 return ret;
00446 }
00447
00448
00449 protected:
00450
00451
00452
00453
00454
00455
00456
00457 void write(std::ostream & out, const char *signal = NULL)
00458 {
00459 int nBus = p_network->numBuses();
00460 char *string;
00461 int nwrites = 0;
00462 int i;
00463 int one = 1;
00464 string = (char*)malloc(p_size*sizeof(char));
00465
00466
00467 for (i=0; i<nBus; i++) {
00468 if (p_network->getActiveBus(i) &&
00469 p_network->getBus(i)->serialWrite(string,p_size,signal)) {
00470 nwrites++;
00471 }
00472 }
00473 free(string);
00474
00475
00476 int *iptr;
00477 char *ptr;
00478 GA_Zero(p_maskGA);
00479 if (nwrites > 0) {
00480 std::vector<int*> index(nwrites);
00481 std::vector<int> indexbuf(nwrites);
00482 iptr = &indexbuf[0];
00483 std::vector<int> ones(nwrites);
00484 char *strbuf = NULL;
00485 if (nwrites*p_size > 0) strbuf = new char[nwrites*p_size];
00486 ptr = strbuf;
00487 int ncnt = 0;
00488 for (i=0; i<nBus; i++) {
00489 if (ncnt >= nwrites) break;
00490 if (p_network->getActiveBus(i) &&
00491 p_network->getBus(i)->serialWrite(ptr,p_size,signal)) {
00492 index[ncnt] = iptr;
00493 *(index[ncnt]) = p_network->getGlobalBusIndex(i);
00494 ones[ncnt] = 1;
00495 ncnt++;
00496 ptr += p_size;
00497 iptr++;
00498 }
00499 }
00500
00501
00502 if (ncnt > 0) {
00503 NGA_Scatter(p_stringGA,strbuf,&index[0],nwrites);
00504 NGA_Scatter(p_maskGA,&ones[0],&index[0],nwrites);
00505 }
00506 if (nwrites*p_size > 0) delete [] strbuf;
00507 }
00508 GA_Pgroup_sync(p_GAgrp);
00509
00510
00511
00512 if (GA_Pgroup_nodeid(p_GAgrp) == 0) {
00513 int nprocs = GA_Pgroup_nnodes(p_GAgrp);
00514 int lo, hi;
00515 for (i=0; i<nprocs; i++) {
00516 NGA_Distribution(p_maskGA, i, &lo, &hi);
00517 int ld = hi - lo + 1;
00518
00519 std::vector<int> imask(ld);
00520 NGA_Get(p_maskGA,&lo,&hi,&imask[0],&one);
00521 int j;
00522 nwrites = 0;
00523 for (j=0; j<ld; j++) {
00524 if (imask[j] == 1) {
00525 nwrites++;
00526 }
00527 }
00528
00529 if (nwrites > 0) {
00530 char *iobuf;
00531 if (p_size*nwrites > 0) iobuf = new char[p_size*nwrites];
00532 std::vector<int*> index(nwrites);
00533 std::vector<int> indexbuf(nwrites);
00534 iptr = &indexbuf[0];
00535 nwrites = 0;
00536 for (j=0; j<ld; j++) {
00537 if (imask[j] == 1) {
00538 index[nwrites] = iptr;
00539 *(index[nwrites]) = j + lo;
00540 nwrites++;
00541 iptr++;
00542 }
00543 }
00544 NGA_Gather(p_stringGA,iobuf,&index[0],nwrites);
00545 ptr = iobuf;
00546 nwrites = 0;
00547 for (j=0; j<ld; j++) {
00548 if (imask[j] == 1) {
00549 #ifndef USE_GOSS
00550 out << ptr;
00551 #else
00552 if (p_channel) {
00553 p_channel_buf.append(ptr);
00554 } else {
00555 out << ptr;
00556 }
00557 #endif
00558 ptr += p_size;
00559 nwrites++;
00560 }
00561 }
00562 if (p_size*nwrites > 0) delete [] iobuf;
00563 }
00564 }
00565 }
00566 GA_Pgroup_sync(p_GAgrp);
00567 }
00568
00569
00570
00571
00572
00573
00574
00575
00576 void header(std::ostream & out, const char *str)
00577 {
00578 if (GA_Pgroup_nodeid(p_GAgrp) == 0) {
00579 #ifndef USE_GOSS
00580 out << str;
00581 #else
00582 if (p_channel) {
00583 p_channel_buf.append(str);
00584 } else {
00585 out << str;
00586 }
00587 #endif
00588 }
00589 }
00590
00591 private:
00592 int p_GA_type;
00593 boost::shared_ptr<_network> p_network;
00594 int p_stringGA;
00595 int p_maskGA;
00596 int p_size;
00597 boost::shared_ptr<std::ofstream> p_fout;
00598 int p_GAgrp;
00599 #ifdef USE_GOSS
00600 gridpack::goss::GOSSUtils *p_goss;
00601 std::string p_channel_buf;
00602 bool p_channel;
00603 #endif
00604 };
00605
00606 template <class _network>
00607 class SerialBranchIO {
00608 public:
00609
00610
00611
00612
00613
00614 SerialBranchIO(int max_str_len,
00615 boost::shared_ptr<_network> network)
00616 {
00617 p_GAgrp = network->communicator().getGroup();
00618 p_GA_type = NGA_Register_type(max_str_len);
00619 p_network = network;
00620 p_size = max_str_len;
00621
00622
00623
00624 int nbranch = p_network->totalBranches();
00625 int one = 1;
00626 p_stringGA = GA_Create_handle();
00627 GA_Set_data(p_stringGA,one,&nbranch,p_GA_type);
00628 GA_Set_pgroup(p_stringGA, p_GAgrp);
00629 GA_Allocate(p_stringGA);
00630 p_maskGA = GA_Create_handle();
00631 GA_Set_data(p_maskGA,one,&nbranch,C_INT);
00632 GA_Set_pgroup(p_maskGA, p_GAgrp);
00633 GA_Allocate(p_maskGA);
00634 p_fout.reset();
00635 }
00636
00637
00638
00639
00640 ~SerialBranchIO(void)
00641 {
00642 NGA_Deregister_type(p_GA_type);
00643 GA_Destroy(p_stringGA);
00644 GA_Destroy(p_maskGA);
00645 this->close();
00646 }
00647
00648
00649
00650
00651
00652 void open(const char *filename)
00653 {
00654 if (GA_Pgroup_nodeid(p_GAgrp) == 0) {
00655 this->close();
00656 p_fout.reset(new std::ofstream);
00657 p_fout->open(filename);
00658 }
00659 }
00660
00661
00662
00663
00664
00665 boost::shared_ptr<std::ofstream> getStream()
00666 {
00667 return p_fout;
00668 }
00669
00670
00671
00672
00673
00674 void setStream(boost::shared_ptr<std::ofstream> stream)
00675 {
00676 p_fout = stream;
00677 }
00678
00679
00680
00681
00682 void close()
00683 {
00684 if (GA_Pgroup_nodeid(p_GAgrp) == 0) {
00685 if (p_fout) {
00686 if (p_fout->is_open()) p_fout->close();
00687 }
00688 }
00689 p_fout.reset();
00690 }
00691
00692
00693
00694
00695
00696
00697 void write(const char *signal = NULL)
00698 {
00699 if (p_fout) {
00700 write(*p_fout, signal);
00701 } else {
00702 write(std::cout, signal);
00703 }
00704 }
00705
00706
00707
00708
00709
00710
00711
00712 void header(const char *str)
00713 {
00714 if (p_fout) {
00715 header(*p_fout, str);
00716 } else {
00717 header(std::cout, str);
00718 }
00719 }
00720
00721
00722
00723
00724
00725
00726
00727
00728
00729 template <class _data_type> void gatherData(std::vector<_data_type>
00730 &data_vector, const char* signal = NULL)
00731 {
00732 int nBranch = p_network->numBranches();
00733 if (sizeof(_data_type) > p_size) {
00734 char buf[256];
00735 sprintf(buf,"SerialBranchIO::gatherData: data_type size inconsistent"
00736 " with allocated size: data: %ld allocated: %d\n",
00737 sizeof(_data_type),p_size);
00738 printf("%s",buf);
00739 throw gridpack::Exception(buf);
00740 }
00741 _data_type data;
00742 int dsize = sizeof(_data_type);
00743 int nwrites = 0;
00744 int i;
00745 int one = 1;
00746 GA_Zero(p_maskGA);
00747
00748
00749 for (i=0; i<nBranch; i++) {
00750 if (p_network->getActiveBranch(i) &&
00751 p_network->getBranch(i)->getDataItem(&data,signal)) nwrites++;
00752 }
00753
00754
00755 int *iptr;
00756 char *ptr;
00757
00758 GA_Zero(p_maskGA);
00759 if (nwrites > 0) {
00760 std::vector<int*> index(nwrites);
00761 std::vector<int> indexbuf(nwrites);
00762 iptr = &indexbuf[0];
00763 std::vector<int> ones(nwrites);
00764 char *strbuf;
00765 if (nwrites*p_size > 0) strbuf = new char[nwrites*p_size];
00766 ptr = strbuf;
00767 int ncnt = 0;
00768 for (i=0; i<nBranch; i++) {
00769 if (ncnt >= nwrites) break;
00770 if (p_network->getActiveBranch(i) &&
00771 p_network->getBranch(i)->getDataItem(ptr,signal)) {
00772 index[ncnt] = iptr;
00773 *(index[ncnt]) = p_network->getGlobalBranchIndex(i);
00774 ones[ncnt] = 1;
00775 ncnt++;
00776 ptr += p_size;
00777 iptr++;
00778 }
00779 }
00780
00781
00782 if (ncnt > 0) {
00783 NGA_Scatter(p_stringGA,strbuf,&index[0],nwrites);
00784 NGA_Scatter(p_maskGA,&ones[0],&index[0],nwrites);
00785 }
00786 if (nwrites*p_size > 0) delete [] strbuf;
00787 }
00788 GA_Pgroup_sync(p_GAgrp);
00789
00790
00791
00792 data_vector.clear();
00793 if (GA_Pgroup_nodeid(p_GAgrp) == 0) {
00794 int nprocs = GA_Pgroup_nnodes(p_GAgrp);
00795 int lo, hi;
00796 for (i=0; i<nprocs; i++) {
00797 NGA_Distribution(p_maskGA, i, &lo, &hi);
00798 int ld = hi - lo + 1;
00799
00800 std::vector<int> imask(ld);
00801 NGA_Get(p_maskGA,&lo,&hi,&imask[0],&one);
00802 int j;
00803 nwrites = 0;
00804 for (j=0; j<ld; j++) {
00805 if (imask[j] == 1) {
00806 nwrites++;
00807 }
00808 }
00809
00810 if (nwrites > 0) {
00811 char *iobuf;
00812 if (p_size*nwrites > 0) iobuf = new char[p_size*nwrites];
00813 std::vector<int*> index(nwrites);
00814 std::vector<int> indexbuf(nwrites);
00815 iptr = &indexbuf[0];
00816 nwrites = 0;
00817 for (j=0; j<ld; j++) {
00818 if (imask[j] == 1) {
00819 index[nwrites] = iptr;
00820 *(index[nwrites]) = j + lo;
00821 nwrites++;
00822 iptr++;
00823 }
00824 }
00825 NGA_Gather(p_stringGA,iobuf,&index[0],nwrites);
00826 ptr = iobuf;
00827 nwrites = 0;
00828 for (j=0; j<ld; j++) {
00829 if (imask[j] == 1) {
00830 memcpy(&data,ptr,dsize);
00831 data_vector.push_back(data);
00832 ptr += p_size;
00833 nwrites++;
00834 }
00835 }
00836 if (p_size*nwrites > 0) delete [] iobuf;
00837 }
00838 }
00839 }
00840 GA_Pgroup_sync(p_GAgrp);
00841 }
00842
00843
00844
00845
00846
00847
00848
00849 std::vector<std::string> writeStrings(const char *signal = NULL)
00850 {
00851 int nBranch = p_network->numBranches();
00852 char *string;
00853 string = new char[p_size];
00854 int nwrites = 0;
00855 int i;
00856 int one = 1;
00857 std::vector<std::string> ret;
00858
00859
00860 for (i=0; i<nBranch; i++) {
00861 if (p_network->getActiveBranch(i) &&
00862 p_network->getBranch(i)->serialWrite(string,p_size,signal)) nwrites++;
00863 }
00864 delete [] string;
00865
00866
00867 int *iptr;
00868 char *ptr;
00869 GA_Zero(p_maskGA);
00870 if (nwrites > 0) {
00871 std::vector<int*> index(nwrites);
00872 std::vector<int> indexbuf(nwrites);
00873 iptr = &indexbuf[0];
00874 std::vector<int> ones(nwrites);
00875 char *strbuf;
00876 if (nwrites*p_size > 0) strbuf = new char[nwrites*p_size];
00877 ptr = strbuf;
00878 int ncnt = 0;
00879 for (i=0; i<nBranch; i++) {
00880 if (ncnt >= nwrites) break;
00881 if (p_network->getActiveBranch(i) &&
00882 p_network->getBranch(i)->serialWrite(ptr,p_size,signal)) {
00883 index[ncnt] = iptr;
00884 *(index[ncnt]) = p_network->getGlobalBranchIndex(i);
00885 ones[ncnt] = 1;
00886 ncnt++;
00887 ptr += p_size;
00888 iptr++;
00889 }
00890 }
00891
00892
00893 if (ncnt > 0) {
00894 NGA_Scatter(p_stringGA,strbuf,&index[0],nwrites);
00895 NGA_Scatter(p_maskGA,&ones[0],&index[0],nwrites);
00896 }
00897 if (nwrites*p_size > 0) delete [] strbuf;
00898 }
00899 GA_Pgroup_sync(p_GAgrp);
00900
00901
00902
00903 if (GA_Pgroup_nodeid(p_GAgrp) == 0) {
00904 int nprocs = GA_Pgroup_nnodes(p_GAgrp);
00905 int lo, hi;
00906 for (i=0; i<nprocs; i++) {
00907 NGA_Distribution(p_maskGA, i, &lo, &hi);
00908 int ld = hi - lo + 1;
00909
00910 std::vector<int> imask(ld);
00911 NGA_Get(p_maskGA,&lo,&hi,&imask[0],&one);
00912 int j;
00913 nwrites = 0;
00914 for (j=0; j<ld; j++) {
00915 if (imask[j] == 1) {
00916 nwrites++;
00917 }
00918 }
00919
00920 if (nwrites > 0) {
00921 char *iobuf;
00922 if (p_size*nwrites > 0) iobuf = new char[p_size*nwrites];
00923 std::vector<int*> index(nwrites);
00924 std::vector<int> indexbuf(nwrites);
00925 iptr = &indexbuf[0];
00926 nwrites = 0;
00927 for (j=0; j<ld; j++) {
00928 if (imask[j] == 1) {
00929 index[nwrites] = iptr;
00930 *(index[nwrites]) = j + lo;
00931 nwrites++;
00932 iptr++;
00933 }
00934 }
00935 NGA_Gather(p_stringGA,iobuf,&index[0],nwrites);
00936 ptr = iobuf;
00937 nwrites = 0;
00938 for (j=0; j<ld; j++) {
00939 if (imask[j] == 1) {
00940 std::string tmp = ptr;
00941 ret.push_back(tmp);
00942 ptr += p_size;
00943 nwrites++;
00944 }
00945 }
00946 if (p_size*nwrites > 0) delete [] iobuf;
00947 }
00948 }
00949 }
00950 GA_Pgroup_sync(p_GAgrp);
00951 return ret;
00952 }
00953 protected:
00954
00955
00956
00957
00958
00959
00960
00961 void write(std::ostream & out, const char *signal = NULL)
00962 {
00963 int nBranch = p_network->numBranches();
00964 char *string;
00965 string = new char[p_size];
00966 int nwrites = 0;
00967 int i;
00968 int one = 1;
00969
00970
00971 for (i=0; i<nBranch; i++) {
00972 if (p_network->getActiveBranch(i) &&
00973 p_network->getBranch(i)->serialWrite(string,p_size,signal)) nwrites++;
00974 }
00975 delete [] string;
00976
00977
00978 int *iptr;
00979 char *ptr;
00980 GA_Zero(p_maskGA);
00981 if (nwrites > 0) {
00982 std::vector<int*> index(nwrites);
00983 std::vector<int> indexbuf(nwrites);
00984 iptr = &indexbuf[0];
00985 std::vector<int> ones(nwrites);
00986 char *strbuf;
00987 if (nwrites*p_size > 0) strbuf = new char[nwrites*p_size];
00988 ptr = strbuf;
00989 int ncnt = 0;
00990 for (i=0; i<nBranch; i++) {
00991 if (ncnt >= nwrites) break;
00992 if (p_network->getActiveBranch(i) &&
00993 p_network->getBranch(i)->serialWrite(ptr,p_size,signal)) {
00994 index[ncnt] = iptr;
00995 *(index[ncnt]) = p_network->getGlobalBranchIndex(i);
00996 ones[ncnt] = 1;
00997 ncnt++;
00998 ptr += p_size;
00999 iptr++;
01000 }
01001 }
01002
01003
01004 if (ncnt > 0) {
01005 NGA_Scatter(p_stringGA,strbuf,&index[0],nwrites);
01006 NGA_Scatter(p_maskGA,&ones[0],&index[0],nwrites);
01007 }
01008 if (nwrites*p_size > 0) delete [] strbuf;
01009 }
01010 GA_Pgroup_sync(p_GAgrp);
01011
01012
01013
01014 if (GA_Pgroup_nodeid(p_GAgrp) == 0) {
01015 int nprocs = GA_Pgroup_nnodes(p_GAgrp);
01016 int lo, hi;
01017 for (i=0; i<nprocs; i++) {
01018 NGA_Distribution(p_maskGA, i, &lo, &hi);
01019 int ld = hi - lo + 1;
01020
01021 std::vector<int> imask(ld);
01022 NGA_Get(p_maskGA,&lo,&hi,&imask[0],&one);
01023 int j;
01024 nwrites = 0;
01025 for (j=0; j<ld; j++) {
01026 if (imask[j] == 1) {
01027 nwrites++;
01028 }
01029 }
01030
01031 if (nwrites > 0) {
01032 char *iobuf;
01033 if (p_size*nwrites > 0) iobuf = new char[p_size*nwrites];
01034 std::vector<int*> index(nwrites);
01035 std::vector<int> indexbuf(nwrites);
01036 iptr = &indexbuf[0];
01037 nwrites = 0;
01038 for (j=0; j<ld; j++) {
01039 if (imask[j] == 1) {
01040 index[nwrites] = iptr;
01041 *(index[nwrites]) = j + lo;
01042 nwrites++;
01043 iptr++;
01044 }
01045 }
01046 NGA_Gather(p_stringGA,iobuf,&index[0],nwrites);
01047 ptr = iobuf;
01048 nwrites = 0;
01049 for (j=0; j<ld; j++) {
01050 if (imask[j] == 1) {
01051 out << ptr;
01052 ptr += p_size;
01053 nwrites++;
01054 }
01055 }
01056 if (p_size*nwrites > 0) delete [] iobuf;
01057 }
01058 }
01059 }
01060 GA_Pgroup_sync(p_GAgrp);
01061 }
01062
01063
01064
01065
01066
01067
01068
01069
01070 void header(std::ostream & out, const char *str) const
01071 {
01072 if (GA_Pgroup_nodeid(p_GAgrp) == 0) {
01073 out << str;
01074 }
01075 }
01076
01077 private:
01078 int p_GA_type;
01079 boost::shared_ptr<_network> p_network;
01080 int p_stringGA;
01081 int p_maskGA;
01082 int p_size;
01083 boost::shared_ptr<std::ofstream> p_fout;
01084 int p_GAgrp;
01085 };
01086
01087 }
01088 }
01089 #endif // _serial_io_h_